[FLINK-39697] Bump Flink version to 2.2.1#123
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
| <version>${flink.version}</version> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
There was a problem hiding this comment.
Duplicate flink-table-planner dependency. Remove one.
| * | ||
| * @param dataType The row data type whose field names should be stripped. | ||
| * @param prefix The prefix to remove from each field name. | ||
| * @return A new data type with the prefix removed from field names. |
There was a problem hiding this comment.
New utility class lacks unit tests. Add TableDataTypeUtilsTest to verify stripRowPrefix() and renameRowFields().
| public PulsarSourceFetcherManager( | ||
| FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue, | ||
| Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier, | ||
| Configuration configuration) { |
There was a problem hiding this comment.
Constructor signature change is correct for Flink 2.x. Consider adding Javadoc to note the elementsQueue parameter removal.
| @Override | ||
| public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( | ||
| TypeSerializer<T> newSerializer) { | ||
| TypeSerializerSnapshot<T> oldSerializerSnapshot) { |
There was a problem hiding this comment.
Always returning compatibleAsIs() may be too permissive. Document the serializer compatibility assumptions or add validation logic.
|
|
||
| <japicmp.skip>false</japicmp.skip> | ||
| <japicmp.referenceVersion>3.0.0-1.16</japicmp.referenceVersion> | ||
| <japicmp.skip>true</japicmp.skip> |
There was a problem hiding this comment.
japicmp.skip=true is correct for the first Flink 2.x release. Re-enable in subsequent versions to track API compatibility.
| matrix: | ||
| flink: [ 1.20.3 ] | ||
| jdk: [ '8, 11, 17' ] | ||
| flink: [ 2.2.1 ] |
There was a problem hiding this comment.
JDK 8 removed correctly per Flink 2.x requirements. Update README.md to document minimum JDK 11 requirement.
Purpose of the change
Upgrade Flink dependency from 1.20.3 to 2.2.1, adapting the Pulsar connector to Flink 2.x breaking API changes while preserving existing behavior and test coverage.
Brief change log
flink.versionfrom1.20.3to2.2.1.TwoPhaseCommittingSink→Sink+SupportsCommitter;PrecommittingSinkWriter→CommittingSinkWriter;InitContext→WriterInitContext.FutureCompletingBlockingQueuefromPulsarSourceFetcherManagerandPulsarSourceReader(now managed internally by framework).ExecutionConfig→SerializerConfiginTypeInformation.createSerializer(),PulsarSourceBuilder, andPulsarTypeInformationWrapper.TypeSerializerSnapshot.resolveSchemaCompatibility()to new signature.PulsarSinkContextImplto useWriterInitContext.getTaskInfo()for subtask metadata.TableDataTypeUtilsto replace removedDataTypeUtils.stripRowPrefix()andrenameRowFields().legacy.SourceFunctionandlegacy.SinkFunctionpackage paths.PulsarWriterTest.MockInitContextto implementWriterInitContextwithTaskInfointerface.scala-reflect,scala-library) and Kryo dependencies (no longer used in Flink 2.x).flink-table-planner-loaderfromflink-table-test-utilsto fix executor instantiation conflict.${scala.binary.version}property forflink-table-plannerartifact references.japicmpcheck (no valid reference version for first Flink 2.x-based release).Verifying this change
This change is already covered by existing tests:
PulsarSinkITCaseverifies sink functionality with allDeliveryGuaranteemodes (NONE, AT_LEAST_ONCE, EXACTLY_ONCE).PulsarTableITCaseverifies Table API source/sink with multiple formats (JSON, Avro, CSV).PulsarTableOptionsTestverifies table option validation logic.PulsarWriterTestverifies writer unit behavior with mocked contexts.PulsarSourceReaderandPulsarSourceFetcherManagerchanges are covered by existing source integration tests.Significant changes
@Public(Evolving))